package com.chj.dlx;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 普通的消费者,但是自己无法消费的消息，将投入死信队列
 */
public class WillMakeDlxConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.200.169.5");
        factory.setVirtualHost("chj_vhost");
        factory.setUsername("chj");
        factory.setPassword("123456");
        // 打开连接和创建频道，与发送端一样
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DlxProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // TODO 绑定死信交换器 声明一个队列，并绑定死信交换器
        String queueName = "dlx_make";
        Map<String,Object> params = new HashMap<String,Object>();
        params.put("x-dead-letter-exchange", DlxProcessConsumer.DLX_EXCHANGE_NAME);
        //TODO 死信路由键，会替换消息原来的路由键
        params.put("x-dead-letter-routing-key", "deal");
        channel.queueDeclare(queueName,false,true,false,params);
        // 绑定，将队列和交换器通过路由键进行绑定
        channel.queueBind(queueName,DlxProducer.EXCHANGE_NAME,"#");
        System.out.println("waiting for message........");
        // 声明了一个死信消费者
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties,byte[] body) throws IOException {
                // TODO 如果是king的消息确认
                if(envelope.getRoutingKey().equals("king")){
                    System.out.println("Received[" +envelope.getRoutingKey() +"]"+body);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }else{
                    // TODO 如果是其他的消息拒绝（queue=false），成为死信消息
                    System.out.println("Will reject[" +envelope.getRoutingKey()+"]"+body);
                    channel.basicReject(envelope.getDeliveryTag(),false);
                }
            }
        };
        // 消费者正式开始在指定队列上消费消息
        channel.basicConsume(queueName,true,consumer);
    }
}
